# [十二] Spring的事务管理 - 声明式事务
# Spring的事务管理
导读
在Spring中,事务也是用 AOP 切面技术来实现的,有两种实现方式:
- 声明式事务管理: 基于
Spring AOP
实现。其本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务。
- 声明式事务管理: 基于
- 编程式事务管理: 编程式事务管理使用
TransactionTemplate
可实现更细粒度的事务控制。
- 编程式事务管理: 编程式事务管理使用
声明式事务管理不需要入侵代码,通过@Transactional
就可以进行事务操作,更快捷而且简单且大部分业务都可以满足,推荐使用。
其实不管是编程式事务还是声明式事务,最终调用的底层核心代码是一致的。
# 声明式事务管理
1、 首先创建一个配置类,通过@EnableTransactionManagement
开启事务管理功能
@Component
@EnableTransactionManagement(proxyTargetClass = false)
@MapperScan(basePackages = {"com.xiangxue.jack.dao"},annotationClass = Repository.class)
public class EnableTransactionManagementBean {
/*
* 以下两个@Bean的配置都可以设置数据源
* 通过SqlSessionFactoryBean设置数据源
*/
@Bean
public SqlSessionFactoryBean sqlSessionFactoryBean(DataSource dataSource) {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
return sqlSessionFactoryBean;
}
/*
* 通过事务管理平台的方式设置数据源
*/
@Bean
public PlatformTransactionManager annotationDrivenTransactionManager(DataSource dataSource) {
DataSourceTransactionManager dtm = new DataSourceTransactionManager();
dtm.setDataSource(dataSource);
return dtm;
}
}
2、定义数据源
@Configuration
@PropertySource("classpath:config/core/core.properties")
public class DataSourceConfiguration {
@Value("${jdbc.driverClassName}")
private String driverClass;
@Value("${jdbc.url:jdbc}")
private String jdbcUrl;
@Value("${jdbc.username}")
private String user;
@Value("${jdbc.password}")
private String password;
@Bean
public DataSource comboPooledDataSource() {
ComboPooledDataSource comboPooledDataSource = new ComboPooledDataSource();
try {
comboPooledDataSource.setDriverClass(driverClass);
comboPooledDataSource.setJdbcUrl(jdbcUrl);
comboPooledDataSource.setUser(user);
comboPooledDataSource.setPassword(password);
comboPooledDataSource.setMinPoolSize(10);
comboPooledDataSource.setMaxPoolSize(100);
comboPooledDataSource.setMaxIdleTime(1800);
comboPooledDataSource.setAcquireIncrement(3);
comboPooledDataSource.setMaxStatements(1000);
comboPooledDataSource.setInitialPoolSize(10);
comboPooledDataSource.setIdleConnectionTestPeriod(60);
comboPooledDataSource.setAcquireRetryAttempts(30);
comboPooledDataSource.setBreakAfterAcquireFailure(false);
comboPooledDataSource.setTestConnectionOnCheckout(false);
comboPooledDataSource.setAcquireRetryDelay(100);
} catch (PropertyVetoException e) {
e.printStackTrace();
}
return comboPooledDataSource;
}
}
jdbc.driverClassName = org.gjt.mm.mysql.Driver
jdbc.url = jdbc:mysql://127.0.0.1:3306/consult
jdbc.username = root
jdbc.password = 123456
知识点
数据源和事务管理平台的加载都是在类ProxyTransactionManagementConfiguration
中进行的。Spring中的事务就是由 connection
对象控制的,所以connection
对象是和事务是绑定的,然后用户请求过来时又需要数据库连接来执行 sql 语句, 所以用户请求线程又是跟 connection
是绑定的。
# 开启事务注解解析流程
首先看以下
@EnableTransactionManagement
注解的源码
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
// proxyTargetClass = false 表示是JDK动态代理支持接口代理。
// true表示是Cglib代理支持子类继承代理。
boolean proxyTargetClass() default false;
// 事务通知模式(切面织入方式),默认代理模式(同一个类中方法互相调用拦截器不会生效)
AdviceMode mode() default AdviceMode.PROXY;
// 连接点上有多个通知时,排序,默认最低。值越大优先级越低。
int order() default Ordered.LOWEST_PRECEDENCE;
}
进入
TransactionManagementConfigurationSelector
类
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override
protected String[] selectImports(AdviceMode adviceMode) {
/**
* TODO : 导入需要加载的类
* 1、AutoProxyRegistrar: 给容器中注册一个InfrastructureAdvisorAutoProxyCreator(Spring事务入口类) 组件;
* 利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),
* 代理对象执行方法利用拦截器链进行调用;
*
* 2、ProxyTransactionManagementConfiguration: 就是一个配置类,定义了事务增强器、拦截器。
*
*/
switch (adviceMode) {
case PROXY:
// 重点看这2个类:
// AutoProxyRegistrar
// ProxyTransactionManagementConfiguration
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}
}
那么这个
@EnableTransactionManagement
开启事务管理配置的注解是如何被调用的呢?我们回顾一下Spring的初始化核心流程
# Spring 初始化核心流程回顾
spring容器初始化的核心方法AbstractApplicationContext#refresh ,
├─
refresh Spring 初始化核心流程入口│ ├─
prepareRefresh ① 准备此上下文用于刷新,设置启动时间和active标志,初始化属性│ ├─
obtainFreshBeanFactory ② 创建 BeanFactory│ ├─
prepareBeanFactory ③ 设置 BeanFactory 的基本属性│ ├─
postProcessBeanFactory ④ 子类处理自定义的BeanFactoryPostProcess│ ├─
invokeBeanFactoryPostProcessors ⑤ 开启事务管理配置的调用入口│ ├─
registerBeanPostProcessors ⑥ 注册拦截Bean创建的Bean处理器│ ├─
initMessageSource ⑦ 初始化上下文中的资源文件,如国际化文件的处理等│ ├─
initApplicationEventMulticaster ⑧ 初始化上下文的事件传播器│ ├─
onRefresh ⑨ 给子类扩展初始化其他Bean,springboot 中用来做内嵌 tomcat 启动│ ├─
registerListeners ⑩ 在所有bean中查找监听 bean,然后注册到广播器中│ ├─
finishBeanFactoryInitialization ⑪ 初始化所有的单例Bean、ioc、BeanPostProcessor的执行、Aop入口│ └─
finishRefresh ⑫ 完成刷新过程,发布相应的事件
进入
invokeBeanFactoryPostProcessors()
方法类文件: org.springframework.beans.factory.support.
AbstractApplicationContext
protected void invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory beanFactory) {
// 看这个 invokeBeanFactoryPostProcessors 方法
PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());
if (beanFactory.getTempClassLoader() == null && beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
}
}
进入
invokeBeanFactoryPostProcessors()
方法类文件: org.springframework.beans.factory.support.
PostProcessorRegistrationDelegate
public static void invokeBeanFactoryPostProcessors(
ConfigurableListableBeanFactory beanFactory, List<BeanFactoryPostProcessor> beanFactoryPostProcessors) {
/** ...... 省略 ...... **/
/**
* 调用过程
* 在这里典型的 BeanDefinitionRegistryPostProcessor 就是 ConfigurationClassPostProcessor
* 用于进行bean定义的加载 比如我们的包扫描,@import等
* @EnableAspectJAutoProxy 注解中的 AspectJAutoProxyRegistrar 类就是通过此方法调用执行
* @EnableTransactionManagement 注解中的 TransactionManagementConfigurationSelector 类就是通过此方法调用执行
*/
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
// 调用完之后,马上clear
currentRegistryProcessors.clear();
}
进入
invokeBeanDefinitionRegistryPostProcessors()
方法类文件: org.springframework.beans.factory.support.
PostProcessorRegistrationDelegate
private static void invokeBeanDefinitionRegistryPostProcessors(
Collection<? extends BeanDefinitionRegistryPostProcessor> postProcessors, BeanDefinitionRegistry registry) {
for (BeanDefinitionRegistryPostProcessor postProcessor : postProcessors) {
// 重点看这个方法
postProcessor.postProcessBeanDefinitionRegistry(registry);
}
}
进入
postProcessBeanDefinitionRegistry()
方法所在类 org.springframework.context.annotation.
ConfigurationClassPostProcessor
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
int registryId = System.identityHashCode(registry);
if (this.registriesPostProcessed.contains(registryId)) {
throw new IllegalStateException(
"postProcessBeanDefinitionRegistry already called on this post-processor against " + registry);
}
if (this.factoriesPostProcessed.contains(registryId)) {
throw new IllegalStateException(
"postProcessBeanFactory already called on this post-processor against " + registry);
}
this.registriesPostProcessed.add(registryId);
// 重点看这个方法
processConfigBeanDefinitions(registry);
}
知识点
ConfigurationClassPostProcessor
类非常重要,此类是对Spring容器中所有的BeanDefinition
的新增、修改等操作,也就是说对BeanDefinition
的增删查改都会在ConfigurationClassPostProcessor
类中进行。同时还支持了@Import、@ImportResource、@ComponetScan、@Configruration等注解的解析工作。
进入
processConfigBeanDefinitions()
方法,这个时候bean
还没有实例化类文件: org.springframework.beans.factory.support.
ConfigurationClassPostProcessor
public void processConfigBeanDefinitions(BeanDefinitionRegistry registry) {
/** ...... 省略 ...... **/
do {
// 进入parse方法
parser.parse(candidates);
parser.validate();
}
while (!candidates.isEmpty());
}
接下来的调用的过程较为繁琐,此处省略无关的源码,按照如下步骤自行跟踪源码即可
进入
parse
方法,进入子
parse
方法,进入
processConfigurationClass
方法,进入
doProcessConfigurationClass
方法,最后进入
processImports
方法,类文件: org.springframework.beans.factory.support.
ConfigurationClassParser
到此处,selectImports()
方法最终会被ConfigurationClassParser
类的processImports()
方法调用执行,具体见第ConfigurationClassParser
类的第569
行,
private void processImports(ConfigurationClass configClass, SourceClass currentSourceClass,
Collection<SourceClass> importCandidates, boolean checkForCircularImports) {
/** ...... 省略 ...... **/
if (candidate.isAssignable(ImportSelector.class)) {
// Candidate class is an ImportSelector -> delegate to it to determine imports
Class<?> candidateClass = candidate.loadClass();
// 反射拿到 ImportSelector 接口类的实例化对象 selector
ImportSelector selector = BeanUtils.instantiateClass(candidateClass, ImportSelector.class);
ParserStrategyUtils.invokeAwareMethods(
selector, this.environment, this.resourceLoader, this.registry);
if (selector instanceof DeferredImportSelector) {
this.deferredImportSelectorHandler.handle(
configClass, (DeferredImportSelector) selector);
}
else {
/*
* 然后通过 selector调用其实现类 AdviceModeImportSelector
* 的selectImports方法,模板中的钩子方法。最后拿到返回的
* TransactionManagementConfigurationSelector的selectImports
* 方法返回的两个类:
* 1、AutoProxyRegistrar.class
* 2、ProxyTransactionManagementConfiguration.class
* 继续递归调用当前的 processImports 方法
*/
String[] importClassNames = selector.selectImports(currentSourceClass.getMetadata());
Collection<SourceClass> importSourceClasses = asSourceClasses(importClassNames);
processImports(configClass, currentSourceClass, importSourceClasses, false);
}
}
}
# 开启事务的入口类
进入
selectImports()
方法,找到其实现类AdviceModeImportSelector
的selectImports
方法类文件: org.springframework.context.annotation.
AdviceModeImportSelector
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format(
"@%s is not present on importing class '%s' as expected",
annType.getSimpleName(), importingClassMetadata.getClassName()));
}
AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
// 此方法最终会调用TransactionManagementConfigurationSelector
// 的 selectImports方法
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
}
return imports;
}
调回到
selectImports
方法后,会把AutoProxyRegistrar
和ProxyTransactionManagementConfiguration
两个类注册到Beandefinition
中
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override
protected String[] selectImports(AdviceMode adviceMode) {
/**
* TODO : 导入需要加载的类
* 1、AutoProxyRegistrar: 给容器中注册一个InfrastructureAdvisorAutoProxyCreator 组件;
* 利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),
* 代理对象执行方法利用拦截器链进行调用;
*
* 2、ProxyTransactionManagementConfiguration: 就是一个配置类,定义了事务增强器、拦截器。
*
*/
switch (adviceMode) {
case PROXY:
// 重点看这2个类:
// AutoProxyRegistrar
// ProxyTransactionManagementConfiguration
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
}
# 事务核心类 - AutoProxyRegistrar 源码
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
// 循环遍历导入类上使用的所有注解(主要处理注解中有 mode和 proxyTargetClass属性的)
Set<String> annoTypes = importingClassMetadata.getAnnotationTypes();
for (String annoType : annoTypes) {
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType);
if (candidate == null) {
continue;
}
// 获取注解中的 mode 属性
Object mode = candidate.get("mode");
// 获取注解中的 proxyTargetClass 属性
Object proxyTargetClass = candidate.get("proxyTargetClass");
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
Boolean.class == proxyTargetClass.getClass()) {
candidateFound = true;
// 如果 mode 是代理模式
if (mode == AdviceMode.PROXY) {
// 注册事务AOP的入口类 InfrastructureAdvisorAutoProxyCreator,实际上这个AOP入口类起不了作用
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean) proxyTargetClass) {
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
}
# 事务核心类 - ProxyTransactionManagementConfiguration 源码
@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
//
/**
* TODO : 定义事务增强器(事务织入)
*
* 定义了一个 advisor,设置事务属性、设置事务拦截器 TransactionInterceptor、设置顺序。
* 核心就是事务拦截器 TransactionInterceptor。
*/
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource());
advisor.setAdvice(transactionInterceptor());
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
// 主要负责解析 @Transactional注解里的属性资源
// 并包装为 TransactionAttribute 类型
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
// 定义事务拦截器
public TransactionInterceptor transactionInterceptor() {
// TransactionInterceptor 实现了 MethodInterceptor 接口,
// 在链式调用中会自动调用它的 invoke 方法
TransactionInterceptor interceptor = new TransactionInterceptor();
// 设置 TransactionAttribute 属性
interceptor.setTransactionAttributeSource(transactionAttributeSource());
// 如果当前事务管理器不为空,设置当前事务管理器到拦截器中
// 事务管理器和数据源绑定,此处可以自定义
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
知识点
此处可以参考父类AbstractTransactionManagementConfiguration
中的setConfigurers
方法,通过重写TransactionManagementConfigurer
的annotationDrivenTransactionManager
方法来自定义一个事务管理器。
@Component
public class TransactionManagementConfigurerBean implements TransactionManagementConfigurer {
@Autowired
private DataSource dataSource;
@Override
public PlatformTransactionManager annotationDrivenTransactionManager() {
DataSourceTransactionManager dtm = new DataSourceTransactionManager();
dtm.setDataSource(dataSource);
return dtm;
}
}
到此为止,Spring中
@EnableTransactionManagement
注解的内部注册解析的准备工作完成。
# 事务拦截器
TransactionInterceptor
Spring声明式事物是基于AOP实现的,如果目标方法存在事物则会对目标对象进行增强代理(JDK/Cglib)。而TransactionInterceptor
则是事物体系中的增强器(Advise),它实现了MethodInterceptor
接口,TransactionInterceptor
会被封装在使用了事务注解@Transactional
的bean组件外面形成该组件的代理对象,当调用相应使用事务注解的方法时,TransactionInterceptor
的invoke
方法拦截器逻辑会被调用执行,从而完成相应的事务管理,包括:创建、提交和回滚等底层操作。
# Spring 事务拦截处理流程
spring事务拦截器解析流程
TransactionInterceptor
#invoke
,
├─
invoke Spring 事务拦截器入口├─
invokeWithinTransaction 调用Spring内部事务│ ├─
getTransactionAttributeSource ① 通过事务属性源读取事务的属性配置│ ├─
getTransactionAttribute ② 获取该方法的事务属性(传播机制、隔离级别)│ ├─
determineTransactionManager ③ 查找容器中的PlatformTransactionManager,用于管理事务│ ├─
methodIdentification ④ 获取被代理的类方法名称│ ├─
createTransactionIfNecessary ⑤ 创建事务并保存到TransactionInfo 声明式事务│ ├─
proceedWithInvocation ⑥ 火炬传递方法,拦截器链调用处理│ ├─
completeTransactionAfterThrowing ⑦ 执行业务报错,回滚事务│ ├─
cleanupTransactionInfo ⑧ 清空当前事务信息,重置为老的│ ├─
commitTransactionAfterReturning ⑨ 返回结果之前提交事务│ ├─
execute ① 执行实现TransactionCallback接口的doInTransaction回调方法 编程式事务│ │ ├─
prepareTransactionInfo ② 准备事务信息│ │ │ ├─
new TransactionInfo ③ 创建事务信息对象│ │ │ ├─
newTransactionStatus ④ 为事务信息对象设置事务状态│ │ │ └─
bindToThread ⑤ 把创建的事务信息对象和线程绑定到TreadLocal│ │ ├─
proceedWithInvocation ⑥ 火炬传递方法, 拦截器链调用处理│ │ ├─
rollbackOn ⑦ 事务是否满足对异常进行回滚处理条件│ │ └─
cleanupTransactionInfo ⑧ 初始化所有的单例Bean、ioc、BeanPostProcessor的执行、Aop入口
进入
invoke
方法,类文件: org.springframework.transaction.interceptor.
TransactionInterceptor
/**
* TODO : 事务拦截器的拦截方法
* 事务拦截器TransactionInterceptor回调方法invoke通过调用TransactionAspectSupport事务切面支持类中的
* createTransactionIfNecessary和prepareTransactionInfo方法创建事务对象
*/
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
// 通过 AOP获取事务的目标类
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// 核心方法,调用内部事务处理,实际调用父类 TransactionAspectSupport的 invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
进入
invokeWithinTransaction
方法,父类中实现类文件: org.springframework.transaction.interceptor.
TransactionAspectSupport
/**
* TODO : 调用内部事务
*/
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
// 通过事务属性源 TransactionAttributeSource读取事务的属性配置(名称匹配)
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取对应事务属性.如果事务属性为空(则目标方法不存在事务)
// 事务属性源 NameMatchTransactionAttributeSource 的方法
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 根据事务的属性获取 beanFactory 中的 PlatformTransactionManager (spring事务管理器的顶级接口),
// 获取 Spring事务管理IoC容器配置的事务处理器
// 一般常用实现是 DataSourceTransactiuonManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 目标方法唯一标识(类.方法,如service.ServiceImpl.add)
// 获取目标类指定方法的事务连接点
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
/*
* 区分不同类型的 PlatformTransactionManager事务处理器,不同类型的事务处理器调用方式不同。对
* CallbackPreferringPlatformTransactionManager,需要回调函数来实现事务的创建和提交,对非
* CallbackPreferringPlatformTransactionManager来说,则不需要使用回调函数来实现事务处理。
*/
// 如果txAttr为空或者tm 属于非CallbackPreferringPlatformTransactionManager类型的事务处理器
// 声明式事务的操作
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 使用 getTransaction 和 提交/回滚 调用的标准事务划分。
// 创建事务,将当前事务状态和信息保存到TransactionInfo对象中,重要
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 这里就是一个环绕增强,在这个proceed前后可以自己定义增强实现。
// 回调方法执行,执行目标方法(原有的业务逻辑)
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 根据事务定义的,该异常需要回滚就回滚,否则提交事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清空当前事务信息,重置为老的
cleanupTransactionInfo(txInfo);
}
// 返回结果之前提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 编程式事务
// CallbackPreferringPlatformTransactionManager类型的事务处理器
else {
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
// 执行实现T ransactionCallback 接口的 doInTransaction 回调方法
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
// 拦截器链调用处理,使得最后目标对象的方法得到调用
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 如果事务满足对异常进行回滚处理条件
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
// 如果异常是运行时异常,则事务回滚处理
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
//如果不是运行时异常,则提交处理
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
// 提交处理
throwableHolder.throwable = ex;
return null;
}
}
//清除当前线程绑定的事务信息
finally {
cleanupTransactionInfo(txInfo);
}
}); // 到这里都是execute方法
// Check result state: It might indicate a Throwable to rethrow.
// 如果是ThrowableHolder类型的异常,则转换为Throwable抛出,上抛异常
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
// 否则异常不做处理直接抛出
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
进入
getTransactionAttribute
方法,父类中实现类文件: org.springframework.transaction.interceptor.
AbstractFallbackTransactionAttributeSource
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
if (method.getDeclaringClass() == Object.class) {
return null;
}
// 首先看是否存在缓存值
Object cacheKey = getCacheKey(method, targetClass);
TransactionAttribute cached = this.attributeCache.get(cacheKey);
if (cached != null) {
// 如果缓存的是一个无事务对象,直接返回 null
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
}
else {
return cached;
}
}
// 缓存中不存在
else {
// 重新计算 TransactionAttribute 属性,根据方法和类的类型获取事务信息
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// 存到缓存中
if (txAttr == null) {
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
}
else {
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
if (txAttr instanceof DefaultTransactionAttribute) {
((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
if (logger.isTraceEnabled()) {
logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
}
// 将事务对象属性放到缓存中,缓存的key与类的类型和方法相关
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}
进入
computeTransactionAttribute
方法,父类中实现类文件: org.springframework.transaction.interceptor.
AbstractFallbackTransactionAttributeSource
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 如果被代理方法非public类型,则直接返回空,Spring事务不支持非public类型方法
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// 方法可能在接口上,但是我们需要目标类的属性。
// 如果目标类为空,则方法将保持不变。
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class.
// 解析方法上的 @Transactional 属性
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
// 如果方法上没有事务属性,就从类上来获取
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
if (specificMethod != method) {
// Fallback is to look at the original method.
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
知识点
被代理方法是否是public
类型,就是在computeTransactionAttribute
这个方法中进行校验的,Spring中的事务只对public
类型的方法起作用,非public
类型事务失效。
# @Transactional注解的解析
进入
findTransactionAttribute
方法,进入
determineTransactionAttribute
方法,进入
parseTransactionAnnotation
方法,类文件: org.springframework.transaction.annotation.
SpringTransactionAnnotationParser
/**
* TODO : 解析事务注解中的属性
*/
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
// 获取事务传播属性
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
// 获取事务隔离等级
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
// 获取事务获取事务超时时间
rbta.setTimeout(attributes.getNumber("timeout").intValue());
// 获取事务是否只读
rbta.setReadOnly(attributes.getBoolean("readOnly"));
// 获取事务事务管理器 bean的名称
rbta.setQualifier(attributes.getString("value"));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
// 获取事务回滚相关配置
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
到此为止,@Transactional注解的加载工作也完成,注解的属性值已近被加载到spring容器中了,接下来开始具体的事务操作过程。
# 事务的实现过程
创建事务之前的准备工作已经完成了,那么具体事务是如何创建实现的呢?
回到
invokeWithinTransaction
方法,进入
createTransactionIfNecessary
方法,类文件: org.springframework.transaction.interceptor.
TransactionAspectSupport
/**
* TODO : 根据给定的事务属性创建事务对象
*/
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果没有指定名称,则应用方法标识作为事务名称。
// 读取事务方法调用的事务配置属性
if (txAttr != null && txAttr.getName() == null) {
// 如果事务名称为 null,则使用方法的名称(事务连接点标识)作为事务名称,
// 调用一个实现 DelegatingTransactionAttribute 接口的匿名内部类
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
// 使用方法名称作为事务名称
public String getName() {
return joinpointIdentification;
}
};
}
// 事务状态封装了事务执行的状态信息
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 事务处理器创建事务,并且返回当前事务的状态信息
// 核心方法,真正底层创建事务对象的方法
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 准备事务信息,事务信息 TransactionInfo 封装了事务配置和状态信息
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
进入
getTransaction
方法,类文件: org.springframework.transaction.support.
AbstractPlatformTransactionManager
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// doGetTransaction()方法是抽象方法,具体的实现由具体的事务处理器提供
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
// 如果没有配置事务属性,则使用默认的事务属性
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
// 检查当前线程是否存在事务,如果已存在事务,那么需要根据在事务属性中定义的事务传播属性来处理事务的产生(检查传播行为)
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// 处理已存在的事务的情况
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
// 检查事务属性中timeout超时属性设置是否合理
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
// 对事务属性中配置的事务传播特性处理
// 如果当前获取不到存在的事务,且事务传播特性配置的是 mandatory,当前没有事务存在,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 获取不到存在的事务,并且事务传播属性是 REQUIRED、REQUIRES_NEW、NESTED
// 则执行事务挂起,新建事务。
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 挂起当前事务
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
// 不激活和当前线程绑定的事务,因为事务传播特性配置要求创建新的事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建一个新的事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 核心方法:创建事务的调用,具体实现由具体的事务处理器提供。
// 设置事务隔离级别(spring默认的事务隔离级别是跟JDBC相同的,即默认情况Spring不设置事务隔离级别);
doBegin(transaction, definition);
// 初始化和同步事务状态
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
// 创建空事务,针对 supported 类型的事务传播特性,激活和当前线程绑定的事务
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 准备事务状态
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
知识点
抽象事务管理器AbstractPlatformTransactionManager
提供了创建事务的模板,这个模板会被具体的事
务处理器所使用,抽象事务管理器根据事务属性配置和当前线程绑定信息对事务是否需要创建以及如何创建进行一
些通用的处理,然后把事务创建的底层细节交给具体的事务处理器实现。
进入
doGetTransaction
方法,由jdbc包下的数据源类实现类文件: org.springframework.jdbc.datasource.
DataSourceTransactionManager
protected Object doGetTransaction() {
// 管理 connection对象,创建回滚点,按照回滚点回滚,释放回滚点
// DataSourceTransactionObject 就是真正的事务对象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// DataSourceTransactionManager 默认是允许嵌套事务的
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// obtainDataSource() 获取数据源对象,其实就是数据库连接对象
// ConnectionHolder 持有了 Connection,并对 Connection进行了包装
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
进入
getResource
方法,类文件: org.springframework.transaction.support.
TransactionSynchronizationManager
public static Object getResource(Object key) {
// 查看数据源连接池有没有扩展,一般没有
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
进入
doGetResource
方法,类文件: org.springframework.transaction.support.
TransactionSynchronizationManager
private static Object doGetResource(Object actualKey) {
// 事务默认传播属性会共用同一个连接,因此会去 ThreadLocal 中获取连接,如果存在就直接返回
// map对应了当前数据源对象和 当前连接的绑定关系
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
执行到
doGetResource
,也就说明了,事务的执行默认使用了同一个数据库连接。
doGetTransaction
方法执行完返回了事务对象,返回到getTransaction
方法中进入
doBegin
方法类文件: org.springframework.jdbc.datasource.
DataSourceTransactionManager
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 如果没有数据库连接
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 从连接池里面获取连接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 把连接包装成 ConnectionHolder,然后设置到事务对象中
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 从数据库连接中获取隔离级别,Mysql默认隔离级别是 可重复读
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 关闭连接的自动提交,其实这步就是开启了事务
con.setAutoCommit(false);
}
// 设置只读事务 从这一点设置的时间点开始(时间点a)到这个事务结束的过程中,
// 其他事务所提交的数据,该事务将看不见!
// 设置只读事务就是告诉数据库,我这个事务内没有新增,修改,删除操作
// 只有查询操作,不需要数据库锁等操作,减少数据库压力
prepareTransactionalConnection(con, definition);
// 自己提交关闭了,就说明已经开启事务了,事务是活动的
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
// 如果是新创建的事务,则建立当前线程和数据库连接的关系
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
doBegin
方法中的con.setAutoCommit(false)
执行完,正式开启了Spring 事务管理功能。
# 事务问题汇总
问题一
AutoProxyRegistrar
是AOP的入口类,之前@EnableAspectJAutoProxy
也会通过AutoProxyRegistrar
生成一个AOP的入口类,那么AOP的入口类会有两个吗?
参考答案
不会,Spring AOP创建的入口类名字默认都叫org.springframework.aop.config.internalAutoProxyCreator
,并且优先级设置在AopConfigUtils
类中:
static {
// 事务代理 入口类优 先级最低
APC_PRIORITY_LIST.add(InfrastructureAdvisorAutoProxyCreator.class);
// XML代理 入口类 优先级其次
APC_PRIORITY_LIST.add(AspectJAwareAdvisorAutoProxyCreator.class);
// AspectJ代理 入口类 优先级最高
APC_PRIORITY_LIST.add(AnnotationAwareAspectJAutoProxyCreator.class);
}
由此可见,AspectJ的入口类AnnotationAwareAspectJAutoProxyCreator
优先级最高,也就是说如果设置了切面,那么入口类只会有AnnotationAwareAspectJAutoProxyCreator
会起作用。
问题二
两个方法上都使用了@Transactional默认传播属性,内部是如何保证使用了同一个连接的?
参考答案
事务默认传播属性会共用同一个连接,内部是通过ThreadLocal
来保证连接的重复可用。默认第一次肯定不会从Map
中拿到连接,直接返回null
,只有第二次进来get()
的值才不会为空。
private static Object doGetResource(Object actualKey) {
// 事务默认传播属性会共用同一个连接,因此会去 ThreadLocal 中获取连接,
// 如果存在就直接返回,否则创建新的连接
// Map保存了当前数据源对象 和 当前连接的绑定关系
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}